/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.solr.update;

import static org.hamcrest.core.StringContains.containsString;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.apache.HttpSolrClient;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.request.schema.SchemaRequest.Field;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.client.solrj.response.schema.SchemaResponse.FieldResponse;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
import org.apache.solr.cloud.ZkShardTerms;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.embedded.JettySolrRunner;
import org.apache.solr.index.NoMergePolicyFactory;
import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.apache.solr.util.RefCounted;
import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.KeeperException;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Tests the in-place updates (docValues updates) for a one shard, three replica cluster. */
public class TestInPlaceUpdatesDistrib extends AbstractFullDistribZkTestBase {
  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
  private final boolean onlyLeaderIndexes = random().nextBoolean();

  @BeforeClass
  public static void beforeSuperClass() throws Exception {
    schemaString = "schema-inplace-updates.xml";
    configString = "solrconfig-tlog.xml";

    // we need consistent segments that aren't re-ordered on merge because we're
    // asserting inplace updates happen by checking the internal [docid]
    systemSetPropertySolrTestsMergePolicyFactory(NoMergePolicyFactory.class.getName());

    initCore(configString, schemaString);

    // sanity check that autocommits are disabled
    assertEquals(-1, h.getCore().getSolrConfig().getUpdateHandlerInfo().autoCommmitMaxTime);
    assertEquals(-1, h.getCore().getSolrConfig().getUpdateHandlerInfo().autoSoftCommmitMaxTime);
    assertEquals(-1, h.getCore().getSolrConfig().getUpdateHandlerInfo().autoCommmitMaxDocs);
    assertEquals(-1, h.getCore().getSolrConfig().getUpdateHandlerInfo().autoSoftCommmitMaxDocs);

    // assert that NoMergePolicy was chosen
    RefCounted<IndexWriter> iw = h.getCore().getSolrCoreState().getIndexWriter(h.getCore());
    try {
      IndexWriter writer = iw.get();
      assertTrue(
          "Actual merge policy is: " + writer.getConfig().getMergePolicy(),
          writer.getConfig().getMergePolicy() instanceof NoMergePolicy);
    } finally {
      iw.decref();
    }
  }

  @Override
  protected boolean useTlogReplicas() {
    // TODO: tlog replicas makes commits take way to long due to what is likely a bug and it's
    // TestInjection use
    return false;
  }

  public TestInPlaceUpdatesDistrib() {
    super();
    sliceCount = 1;
    fixShardCount(3);
  }

  private SolrClient LEADER = null;
  private List<SolrClient> NONLEADERS = null;

  @Test
  @ShardsFixed(num = 3)
  public void test() throws Exception {
    waitForRecoveriesToFinish(true);

    resetDelays();

    mapReplicasToClients();

    clearIndex();
    commit();

    // sanity check no one broke the assumptions we make about our schema
    checkExpectedSchemaField(
        map(
            "name",
            "inplace_updatable_int",
            "type",
            "int",
            "stored",
            Boolean.FALSE,
            "indexed",
            Boolean.FALSE,
            "docValues",
            Boolean.TRUE));
    checkExpectedSchemaField(
        map(
            "name",
            "inplace_updatable_float",
            "type",
            "float",
            "stored",
            Boolean.FALSE,
            "indexed",
            Boolean.FALSE,
            "docValues",
            Boolean.TRUE));
    checkExpectedSchemaField(
        map(
            "name",
            "_version_",
            "type",
            "long",
            "stored",
            Boolean.FALSE,
            "indexed",
            Boolean.FALSE,
            "docValues",
            Boolean.TRUE));

    // Do the tests now:

    // AwaitsFix this test fails easily
    // delayedReorderingFetchesMissingUpdateFromLeaderTest();

    resetDelays();
    docValuesUpdateTest();
    resetDelays();
    ensureRtgWorksWithPartialUpdatesTest();
    resetDelays();
    outOfOrderUpdatesIndividualReplicaTest();
    resetDelays();
    updatingDVsInAVeryOldSegment();
    resetDelays();
    updateExistingThenNonExistentDoc();
    resetDelays();
    // TODO Should we combine all/some of these into a single test, so as to cut down on execution
    // time?
    reorderedDBQIndividualReplicaTest();
    resetDelays();
    reorderedDeletesTest();
    resetDelays();
    reorderedDBQsSimpleTest();
    resetDelays();
    reorderedDBQsResurrectionTest();
    resetDelays();
    setNullForDVEnabledField();
    resetDelays();

    // AwaitsFix this test fails easily
    // reorderedDBQsUsingUpdatedValueFromADroppedUpdate();
  }

  private void resetDelays() {
    for (JettySolrRunner j : jettys) {
      j.getDebugFilter().unsetDelay();
    }
  }

  private void mapReplicasToClients() throws KeeperException, InterruptedException {
    ZkStateReader zkStateReader = ZkStateReader.from(cloudClient);
    zkStateReader.forceUpdateCollection(DEFAULT_COLLECTION);
    ClusterState clusterState = zkStateReader.getClusterState();
    Replica leader;
    Slice shard1 = clusterState.getCollection(DEFAULT_COLLECTION).getSlice(SHARD1);
    leader = shard1.getLeader();

    String leaderBaseUrl = zkStateReader.getBaseUrlForNodeName(leader.getNodeName());
    for (SolrClient solrClient : clients) {
      if (((HttpSolrClient) solrClient).getBaseURL().startsWith(leaderBaseUrl)) {
        LEADER = solrClient;
      }
    }

    NONLEADERS = new ArrayList<>();
    for (Replica rep : shard1.getReplicas()) {
      if (rep.equals(leader)) {
        continue;
      }
      String baseUrl = zkStateReader.getBaseUrlForNodeName(rep.getNodeName());
      for (SolrClient client : clients) {
        if (((HttpSolrClient) client).getBaseURL().startsWith(baseUrl)) {
          NONLEADERS.add(client);
        }
      }
    }

    assertNotNull(LEADER);
    assertEquals(2, NONLEADERS.size());
  }

  private void setNullForDVEnabledField() throws Exception {
    // to test set=null
    // should this test be here? As set null would be an atomic update
    clearIndex();
    commit();

    buildRandomIndex(0);
    float inplace_updatable_float = 1;

    // update doc, set
    index("id", 0, "inplace_updatable_float", map("set", inplace_updatable_float));

    LEADER.commit();
    SolrDocument sdoc = LEADER.getById("0"); // RTG straight from the index
    assertEquals(inplace_updatable_float, sdoc.get("inplace_updatable_float"));
    assertEquals("title0", sdoc.get("title_s"));
    long version0 = (long) sdoc.get("_version_");

    for (SolrClient client : NONLEADERS) {
      SolrDocument doc = client.getById(String.valueOf(0), params("distrib", "false"));
      assertEquals(inplace_updatable_float, doc.get("inplace_updatable_float"));
      assertEquals(version0, doc.get("_version_"));
    }

    index("id", 0, "inplace_updatable_float", map("set", null));
    LEADER.commit();

    sdoc = LEADER.getById("0"); // RTG straight from the index
    assertNull(sdoc.get("inplace_updatable_float"));
    assertEquals("title0", sdoc.get("title_s"));
    long version1 = (long) sdoc.get("_version_");

    for (SolrClient client : NONLEADERS) {
      SolrDocument doc = client.getById(String.valueOf(0), params("distrib", "false"));
      assertNull(doc.get("inplace_updatable_float"));
      assertEquals(version1, doc.get("_version_"));
    }
  }

  final int NUM_RETRIES = 100, WAIT_TIME = 50;

  // The following should work: full update to doc 0, in-place update for doc 0, delete doc 0
  private void reorderedDBQsSimpleTest() throws Exception {

    clearIndex();
    commit();

    buildRandomIndex(0);

    float inplace_updatable_float = 1;

    // update doc, set
    index("id", 0, "inplace_updatable_float", map("set", inplace_updatable_float));

    LEADER.commit();
    SolrDocument sdoc = LEADER.getById("0"); // RTG straight from the index
    assertEquals(inplace_updatable_float, sdoc.get("inplace_updatable_float"));
    assertEquals("title0", sdoc.get("title_s"));
    long version0 = (long) sdoc.get("_version_");

    // put replica out of sync
    float newinplace_updatable_float = 100;
    List<UpdateRequest> updates = new ArrayList<>();
    updates.add(
        simulatedUpdateRequest(
            null,
            "id",
            0,
            "title_s",
            "title0_new",
            "inplace_updatable_float",
            newinplace_updatable_float,
            "_version_",
            version0 + 1)); // full update
    updates.add(
        simulatedUpdateRequest(
            version0 + 1,
            "id",
            0,
            "inplace_updatable_float",
            newinplace_updatable_float + 1,
            "_version_",
            version0 + 2)); // inplace_updatable_float=101
    updates.add(simulatedDeleteRequest(0, version0 + 3));

    // order the updates correctly for NONLEADER 1
    for (UpdateRequest update : updates) {
      if (log.isInfoEnabled()) {
        log.info("Issuing well ordered update: {}", update.getDocuments());
      }
      NONLEADERS.get(1).request(update);
    }

    // Reordering needs to happen using parallel threads
    ExecutorService threadpool =
        ExecutorUtil.newMDCAwareFixedThreadPool(
            updates.size() + 1, new SolrNamedThreadFactory(getTestName()));

    // re-order the updates for NONLEADER 0
    List<UpdateRequest> reorderedUpdates = new ArrayList<>(updates);
    Collections.shuffle(reorderedUpdates, random());
    List<Future<UpdateResponse>> updateResponses = new ArrayList<>();
    for (UpdateRequest update : reorderedUpdates) {
      AsyncUpdateWithRandomCommit task =
          new AsyncUpdateWithRandomCommit(update, NONLEADERS.get(0), random().nextLong());
      updateResponses.add(threadpool.submit(task));
      // while we can't guarantee/trust what order the updates are executed in, since multiple
      // threads are involved, but we're trying to bias the thread scheduling to run them in the
      // order submitted
      Thread.sleep(10);
    }

    threadpool.shutdown();
    assertTrue(
        "Thread pool didn't terminate within 15 secs",
        threadpool.awaitTermination(15, TimeUnit.SECONDS));

    // assert all requests were successful
    for (Future<UpdateResponse> resp : updateResponses) {
      assertEquals(0, resp.get().getStatus());
    }

    // assert both replicas have same effect
    for (SolrClient client : NONLEADERS) { // 0th is re-ordered replica, 1st is well-ordered replica
      SolrDocument doc = client.getById(String.valueOf(0), params("distrib", "false"));
      assertNull("This doc was supposed to have been deleted, but was: " + doc, doc);
    }

    log.info("reorderedDBQsSimpleTest: This test passed fine...");
  }

  private void reorderedDBQIndividualReplicaTest() throws Exception {
    if (onlyLeaderIndexes) {
      log.info("RTG with DBQs are not working in tlog replicas");
      return;
    }
    clearIndex();
    commit();

    // put replica out of sync
    float newinplace_updatable_float = 100;
    long version0 = 2000;
    List<UpdateRequest> updates = new ArrayList<>();
    updates.add(
        simulatedUpdateRequest(
            null,
            "id",
            0,
            "title_s",
            "title0_new",
            "inplace_updatable_float",
            newinplace_updatable_float,
            "_version_",
            version0 + 1)); // full update
    updates.add(
        simulatedUpdateRequest(
            version0 + 1,
            "id",
            0,
            "inplace_updatable_float",
            newinplace_updatable_float + 1,
            "_version_",
            version0 + 2)); // inplace_updatable_float=101
    updates.add(
        simulatedDeleteRequest(
            "inplace_updatable_float:" + (newinplace_updatable_float + 1), version0 + 3));

    // Reordering needs to happen using parallel threads
    ExecutorService threadpool =
        ExecutorUtil.newMDCAwareFixedThreadPool(
            updates.size() + 1, new SolrNamedThreadFactory(getTestName()));

    // re-order the updates by swapping the last two
    List<UpdateRequest> reorderedUpdates = new ArrayList<>(updates);
    reorderedUpdates.set(1, updates.get(2));
    reorderedUpdates.set(2, updates.get(1));

    List<Future<UpdateResponse>> updateResponses = new ArrayList<>();
    for (UpdateRequest update : reorderedUpdates) {
      AsyncUpdateWithRandomCommit task =
          new AsyncUpdateWithRandomCommit(update, NONLEADERS.get(0), random().nextLong());
      updateResponses.add(threadpool.submit(task));
      // while we can't guarantee/trust what order the updates are executed in, since multiple
      // threads are involved, but we're trying to bias the thread scheduling to run them in the
      // order submitted
      Thread.sleep(100);
    }

    threadpool.shutdown();
    assertTrue(
        "Thread pool didn't terminate within 15 secs",
        threadpool.awaitTermination(15, TimeUnit.SECONDS));

    // assert all requests were successful
    for (Future<UpdateResponse> resp : updateResponses) {
      assertEquals(0, resp.get().getStatus());
    }

    SolrDocument doc = NONLEADERS.get(0).getById(String.valueOf(0), params("distrib", "false"));
    assertNull("This doc was supposed to have been deleted, but was: " + doc, doc);

    log.info("reorderedDBQIndividualReplicaTest: This test passed fine...");
    clearIndex();
    commit();
  }

  private void docValuesUpdateTest() throws Exception {
    // number of docs we're testing (0 <= id), index may contain additional random docs (id < 0)
    int numDocs = atLeast(100);
    if (onlyLeaderIndexes) numDocs = TestUtil.nextInt(random(), 10, 50);
    log.info("Trying num docs = {}", numDocs);
    final List<Integer> ids = new ArrayList<Integer>(numDocs);
    for (int id = 0; id < numDocs; id++) {
      ids.add(id);
    }

    buildRandomIndex(101.0F, ids);

    List<Integer> luceneDocids = new ArrayList<>(numDocs);
    List<Number> valuesList = new ArrayList<>(numDocs);
    SolrParams params =
        params(
            "q",
            "id:[0 TO *]",
            "fl",
            "*,[docid]",
            "rows",
            String.valueOf(numDocs),
            "sort",
            "id_i asc");
    SolrDocumentList results = LEADER.query(params).getResults();
    assertEquals(numDocs, results.size());
    for (SolrDocument doc : results) {
      luceneDocids.add((Integer) doc.get("[docid]"));
      valuesList.add((Float) doc.get("inplace_updatable_float"));
    }
    log.info("Initial results: {}", results);

    // before we do any atomic operations, sanity check our results against all clients
    assertDocIdsAndValuesAgainstAllClients(
        "sanitycheck", params, luceneDocids, "inplace_updatable_float", valuesList);

    // now we're going to overwrite the value for all of our testing docs
    // giving them a value between -5 and +5
    for (int id : ids) {
      // NOTE: in rare cases, this may be setting the value to 0, on a doc that
      // already had an init value of 0 -- which is an interesting edge case, so we don't exclude it
      final float multiplier = r.nextBoolean() ? -5.0F : 5.0F;
      final float value = r.nextFloat() * multiplier;
      assertTrue(-5.0F <= value && value <= 5.0F);
      valuesList.set(id, value);
    }
    log.info("inplace_updatable_float: {}", valuesList);

    // update doc w/ set
    Collections.shuffle(ids, r); // so updates aren't applied in index order
    for (int id : ids) {
      index("id", id, "inplace_updatable_float", map("set", valuesList.get(id)));
    }

    commit();

    assertDocIdsAndValuesAgainstAllClients(
        "set",
        SolrParams.wrapDefaults(
            params(
                "q", "inplace_updatable_float:[-5.0 TO 5.0]",
                "fq", "id:[0 TO *]"),
            // existing sort & fl that we want...
            params),
        luceneDocids,
        "inplace_updatable_float",
        valuesList);

    // update doc, w/increment
    log.info("Updating the documents...");
    Collections.shuffle(ids, r); // so updates aren't applied in the same order as our 'set'
    for (int id : ids) {
      // all increments will use some value X such that 20 < abs(X)
      // thus ensuring that after all increments are done, there should be
      // 0 test docs matching the query inplace_updatable_float:[-10 TO 10]
      final float inc = (r.nextBoolean() ? -1.0F : 1.0F) * (r.nextFloat() + (float) atLeast(20));
      assertTrue(20 < Math.abs(inc));
      final float value = (float) valuesList.get(id) + inc;
      assertTrue(value < -10 || 10 < value);

      valuesList.set(id, value);
      index("id", id, "inplace_updatable_float", map("inc", inc));
    }
    commit();

    assertDocIdsAndValuesAgainstAllClients(
        "inc",
        SolrParams.wrapDefaults(
            params(
                "q", "-inplace_updatable_float:[-10.0 TO 10.0]",
                "fq", "id:[0 TO *]"),
            // existing sort & fl that we want...
            params),
        luceneDocids,
        "inplace_updatable_float",
        valuesList);

    log.info("Updating the documents with new field...");
    Collections.shuffle(ids, r);
    for (int id : ids) {
      final int val = random().nextInt(20);
      valuesList.set(id, val);
      index("id", id, "inplace_updatable_int", map((random().nextBoolean() ? "inc" : "set"), val));
    }
    commit();

    assertDocIdsAndValuesAgainstAllClients(
        "inplace_for_first_field_update",
        SolrParams.wrapDefaults(
            params("q", "inplace_updatable_int:[* TO *]", "fq", "id:[0 TO *]"), params),
        luceneDocids,
        "inplace_updatable_int",
        valuesList);
    log.info("docValuesUpdateTest: This test passed fine...");
  }

  /** Ingest many documents, keep committing. Then update a document from a very old segment. */
  private void updatingDVsInAVeryOldSegment() throws Exception {
    clearIndex();
    commit();

    String id = String.valueOf(Integer.MAX_VALUE);
    index("id", id, "inplace_updatable_float", "1", "title_s", "newtitle");

    // create 10 more segments
    for (int i = 0; i < 10; i++) {
      buildRandomIndex(101.0F, Collections.emptyList());
    }

    index("id", id, "inplace_updatable_float", map("inc", "1"));

    for (SolrClient client : new SolrClient[] {LEADER, NONLEADERS.get(0), NONLEADERS.get(1)}) {
      assertEquals("newtitle", client.getById(id).get("title_s"));
      assertEquals(2.0f, client.getById(id).get("inplace_updatable_float"));
    }
    commit();
    for (SolrClient client : new SolrClient[] {LEADER, NONLEADERS.get(0), NONLEADERS.get(1)}) {
      assertEquals("newtitle", client.getById(id).get("title_s"));
      assertEquals(2.0f, client.getById(id).get("inplace_updatable_float"));
    }

    log.info("updatingDVsInAVeryOldSegment: This test passed fine...");
  }

  /**
   * Test scenario:
   *
   * <ul>
   *   <li>Send a batch of documents to one node
   *   <li>Batch consist of an update for document which is existed and an update for documents
   *       which is not existed
   *   <li>Assumption which is made is that both updates will be applied: field for existed document
   *       will be updated, new document will be created for a non existed one
   * </ul>
   */
  private void updateExistingThenNonExistentDoc() throws Exception {
    clearIndex();
    index("id", 1, "inplace_updatable_float", "1", "title_s", "newtitle");
    commit();
    SolrInputDocument existingDocUpdate = new SolrInputDocument();
    existingDocUpdate.setField("id", 1);
    existingDocUpdate.setField("inplace_updatable_float", map("set", "50"));

    SolrInputDocument nonexistentDocUpdate = new SolrInputDocument();
    nonexistentDocUpdate.setField("id", 2);
    nonexistentDocUpdate.setField("inplace_updatable_float", map("set", "50"));

    List<SolrInputDocument> docs = List.of(existingDocUpdate, nonexistentDocUpdate);

    SolrClient solrClient = clients.get(random().nextInt(clients.size()));
    add(solrClient, null, docs);
    commit();
    for (SolrClient client : new SolrClient[] {LEADER, NONLEADERS.get(0), NONLEADERS.get(1)}) {
      for (SolrInputDocument expectDoc : docs) {
        String docId = expectDoc.getFieldValue("id").toString();
        SolrDocument actualDoc = client.getById(docId);
        assertNotNull("expected to get doc by id:" + docId, actualDoc);
        assertEquals(
            "expected to update " + actualDoc, 50.0f, actualDoc.get("inplace_updatable_float"));
      }
    }
  }

  /**
   * Retries the specified 'req' against each SolrClient in "clients" until the expected number of
   * results are returned, at which point the results are verified using
   * assertDocIdsAndValuesInResults
   *
   * @param debug used in log and assertion messages
   * @param req the query to execute, should include rows &amp; sort params such that the results
   *     can be compared to luceneDocids and valuesList
   * @param luceneDocids a list of "[docid]" values to be tested against each doc in the req results
   *     (in order)
   * @param fieldName used to get value from the doc to validate with valuesList
   * @param valuesList a list of given fieldName values to be tested against each doc in results (in
   *     order)
   */
  private void assertDocIdsAndValuesAgainstAllClients(
      final String debug,
      final SolrParams req,
      final List<Integer> luceneDocids,
      final String fieldName,
      final List<Number> valuesList)
      throws Exception {
    assertEquals(luceneDocids.size(), valuesList.size());
    final long numFoundExpected = luceneDocids.size();

    for (SolrClient client : clients) {
      final String clientDebug =
          client.toString() + (LEADER.equals(client) ? " (leader)" : " (not leader)");
      final String msg = "'" + debug + "' results against client: " + clientDebug;
      SolrDocumentList results = null;
      // For each client, do a (sorted) sanity check query to confirm searcher has been re-opened
      // after our update -- if the numFound matches our expectations, then verify the inplace float
      // value and [docid] of each result doc against our expectations to ensure that the values
      // were
      // updated properly w/o the doc being completely re-added internally. (ie: truly inplace)
      RETRY:
      for (int attempt = 0; attempt <= NUM_RETRIES; attempt++) {
        log.info("Attempt #{} checking {}", attempt, msg);
        results = client.query(req).getResults();
        if (numFoundExpected == results.getNumFound()) {
          break RETRY;
        }
        if (attempt == NUM_RETRIES) {
          fail(
              "Repeated retry for "
                  + msg
                  + "; Never got numFound="
                  + numFoundExpected
                  + "; results=> "
                  + results);
        }
        log.info("numFound mismatch, searcher may not have re-opened yet.  Will sleep an retry...");
        Thread.sleep(WAIT_TIME);
      }

      assertDocIdsAndValuesInResults(msg, results, luceneDocids, fieldName, valuesList);
    }
  }

  /**
   * Given a result list sorted by "id", asserts that the "[docid]" and "inplace_updatable_float"
   * values for each document match in order.
   *
   * @param msgPre used as a prefix for assertion messages
   * @param results the sorted results of some query, such that all matches are included (ie: rows =
   *     numFound)
   * @param luceneDocids a list of "[docid]" values to be tested against each doc in results (in
   *     order)
   * @param fieldName used to get value from the doc to validate with valuesList
   * @param valuesList a list of given fieldName values to be tested against each doc in results (in
   *     order)
   */
  private void assertDocIdsAndValuesInResults(
      final String msgPre,
      final SolrDocumentList results,
      final List<Integer> luceneDocids,
      final String fieldName,
      final List<Number> valuesList) {

    assertEquals(luceneDocids.size(), valuesList.size());
    assertEquals(
        msgPre
            + ": rows param wasn't big enough, we need to compare all results matching the query",
        results.getNumFound(),
        results.size());
    assertEquals(
        msgPre + ": didn't get a result for every known docid",
        luceneDocids.size(),
        results.size());

    for (SolrDocument doc : results) {
      final int id = Integer.parseInt(doc.get("id").toString());
      final Object val = doc.get(fieldName);
      final Object docid = doc.get("[docid]");
      assertEquals(msgPre + " wrong val for " + doc, valuesList.get(id), val);
      assertEquals(msgPre + " wrong [docid] for " + doc, luceneDocids.get(id), docid);
    }
  }

  private void ensureRtgWorksWithPartialUpdatesTest() throws Exception {
    clearIndex();
    commit();

    float inplace_updatable_float = 1;
    String title = "title100";
    long version = 0, currentVersion;

    currentVersion = buildRandomIndex(100).get(0);
    assertTrue(currentVersion > version);

    // do an initial (non-inplace) update to ensure both the float & int fields we care about have
    // (any) value that way all subsequent atomic updates will be inplace
    currentVersion =
        addDocAndGetVersion(
            "id", 100,
            "inplace_updatable_float", map("set", r.nextFloat()),
            "inplace_updatable_int", map("set", r.nextInt()));
    LEADER.commit();

    // get the internal docids of id=100 document from the three replicas
    List<Integer> docids = getInternalDocIds("100");

    // update doc, set
    currentVersion =
        addDocAndGetVersion(
            "id", 100, "inplace_updatable_float", map("set", inplace_updatable_float));
    assertTrue(currentVersion > version);
    version = currentVersion;
    LEADER.commit();
    assertEquals(
        "Earlier: " + docids + ", now: " + getInternalDocIds("100"),
        docids,
        getInternalDocIds("100"));

    SolrDocument sdoc = LEADER.getById("100"); // RTG straight from the index
    assertEquals(sdoc.toString(), inplace_updatable_float, sdoc.get("inplace_updatable_float"));
    assertEquals(sdoc.toString(), title, sdoc.get("title_s"));
    assertEquals(sdoc.toString(), version, sdoc.get("_version_"));

    if (r.nextBoolean()) {
      title = "newtitle100";
      currentVersion =
          addDocAndGetVersion(
              "id",
              100,
              "title_s",
              title,
              "inplace_updatable_float",
              inplace_updatable_float); // full indexing
      assertTrue(currentVersion > version);
      version = currentVersion;

      sdoc = LEADER.getById("100"); // RTG from the tlog
      assertEquals(sdoc.toString(), inplace_updatable_float, sdoc.get("inplace_updatable_float"));
      assertEquals(sdoc.toString(), title, sdoc.get("title_s"));
      assertEquals(sdoc.toString(), version, sdoc.get("_version_"));

      // we've done a full index, so we need to update the [docid] for each replica
      LEADER.commit(); // can't get (real) [docid] from the tlogs, need to force a commit
      docids = getInternalDocIds("100");
    }

    inplace_updatable_float++;
    currentVersion = addDocAndGetVersion("id", 100, "inplace_updatable_float", map("inc", 1));
    assertTrue(currentVersion > version);
    version = currentVersion;
    LEADER.commit();
    assertEquals(
        "Earlier: " + docids + ", now: " + getInternalDocIds("100"),
        docids,
        getInternalDocIds("100"));

    currentVersion = addDocAndGetVersion("id", 100, "inplace_updatable_int", map("set", "100"));
    assertTrue(currentVersion > version);
    version = currentVersion;

    inplace_updatable_float++;
    currentVersion = addDocAndGetVersion("id", 100, "inplace_updatable_float", map("inc", 1));
    assertTrue(currentVersion > version);
    version = currentVersion;

    // set operation with invalid value for field
    SolrException e =
        expectThrows(
            SolrException.class,
            () ->
                addDocAndGetVersion(
                    "id", 100, "inplace_updatable_float", map("set", "NOT_NUMBER")));
    assertEquals(SolrException.ErrorCode.BAD_REQUEST.code, e.code());
    assertThat(e.getMessage(), containsString("For input string: \"NOT_NUMBER\""));

    // inc operation with invalid inc value
    e =
        expectThrows(
            SolrException.class,
            () ->
                addDocAndGetVersion("id", 100, "inplace_updatable_int", map("inc", "NOT_NUMBER")));
    assertEquals(SolrException.ErrorCode.BAD_REQUEST.code, e.code());
    assertThat(e.getMessage(), containsString("For input string: \"NOT_NUMBER\""));

    // RTG from tlog(s)
    for (SolrClient client : clients) {
      final String clientDebug =
          client.toString() + (LEADER.equals(client) ? " (leader)" : " (not leader)");
      sdoc = client.getById("100", params("distrib", "false"));

      assertEquals(clientDebug + " => " + sdoc, 100, sdoc.get("inplace_updatable_int"));
      assertEquals(
          clientDebug + " => " + sdoc,
          inplace_updatable_float,
          sdoc.get("inplace_updatable_float"));
      assertEquals(clientDebug + " => " + sdoc, title, sdoc.get("title_s"));
      assertEquals(clientDebug + " => " + sdoc, version, sdoc.get("_version_"));
    }

    // assert that the internal docid for id=100 document remains same, in each replica, as before
    LEADER.commit(); // can't get (real) [docid] from the tlogs, need to force a commit
    assertEquals(
        "Earlier: " + docids + ", now: " + getInternalDocIds("100"),
        docids,
        getInternalDocIds("100"));

    log.info("ensureRtgWorksWithPartialUpdatesTest: This test passed fine...");
  }

  /**
   * Returns the "[docid]" value(s) returned from a non-distrib RTG to each of the clients used in
   * this test (in the same order as the clients list)
   */
  private List<Integer> getInternalDocIds(String id) throws SolrServerException, IOException {
    List<Integer> ret = new ArrayList<>(clients.size());
    for (SolrClient client : clients) {
      SolrDocument doc = client.getById(id, params("distrib", "false", "fl", "[docid]"));
      Object docid = doc.get("[docid]");
      assertNotNull(docid);
      assertEquals(Integer.class, docid.getClass());
      ret.add((Integer) docid);
    }
    assertEquals(clients.size(), ret.size());
    return ret;
  }

  private void outOfOrderUpdatesIndividualReplicaTest() throws Exception {
    clearIndex();
    commit();

    buildRandomIndex(0);

    float inplace_updatable_float = 1;
    // update doc, set
    index("id", 0, "inplace_updatable_float", map("set", inplace_updatable_float));

    LEADER.commit();
    SolrDocument sdoc = LEADER.getById("0"); // RTG straight from the index
    assertEquals(inplace_updatable_float, sdoc.get("inplace_updatable_float"));
    assertEquals("title0", sdoc.get("title_s"));
    long version0 = (long) sdoc.get("_version_");

    // put replica out of sync
    float newinplace_updatable_float = 100;
    List<UpdateRequest> updates = new ArrayList<>();
    updates.add(
        simulatedUpdateRequest(
            null,
            "id",
            0,
            "title_s",
            "title0_new",
            "inplace_updatable_float",
            newinplace_updatable_float,
            "_version_",
            version0 + 1)); // full update
    for (int i = 1; i < atLeast(3); i++) {
      updates.add(
          simulatedUpdateRequest(
              version0 + i,
              "id",
              0,
              "inplace_updatable_float",
              newinplace_updatable_float + i,
              "_version_",
              version0 + i + 1));
    }

    // order the updates correctly for NONLEADER 1
    for (UpdateRequest update : updates) {
      if (log.isInfoEnabled()) {
        log.info("Issuing well ordered update: {}", update.getDocuments());
      }
      NONLEADERS.get(1).request(update);
    }

    // Reordering needs to happen using parallel threads, since some of these updates will
    // be blocking calls, waiting for some previous update operations to arrive on which it depends.
    ExecutorService threadpool =
        ExecutorUtil.newMDCAwareFixedThreadPool(
            updates.size() + 1, new SolrNamedThreadFactory(getTestName()));

    // re-order the updates for NONLEADER 0
    List<UpdateRequest> reorderedUpdates = new ArrayList<>(updates);
    Collections.shuffle(reorderedUpdates, r);
    List<Future<UpdateResponse>> updateResponses = new ArrayList<>();
    for (UpdateRequest update : reorderedUpdates) {
      AsyncUpdateWithRandomCommit task =
          new AsyncUpdateWithRandomCommit(update, NONLEADERS.get(0), random().nextLong());
      updateResponses.add(threadpool.submit(task));
      // while we can't guarantee/trust what order the updates are executed in, since multiple
      // threads are involved, but we're trying to bias the thread scheduling to run them in the
      // order submitted
      Thread.sleep(10);
    }

    threadpool.shutdown();
    assertTrue(
        "Thread pool didn't terminate within 15 secs",
        threadpool.awaitTermination(15, TimeUnit.SECONDS));

    // assert all requests were successful
    for (Future<UpdateResponse> resp : updateResponses) {
      assertEquals(0, resp.get().getStatus());
    }

    // assert both replicas have same effect
    for (SolrClient client : NONLEADERS) { // 0th is re-ordered replica, 1st is well-ordered replica
      if (log.isInfoEnabled()) {
        log.info("Testing client: {}", ((HttpSolrClient) client).getBaseURL());
      }
      assertReplicaValue(
          client,
          0,
          "inplace_updatable_float",
          (newinplace_updatable_float + (float) (updates.size() - 1)),
          "inplace_updatable_float didn't match for replica at client: "
              + ((HttpSolrClient) client).getBaseURL());
      assertReplicaValue(
          client,
          0,
          "title_s",
          "title0_new",
          "Title didn't match for replica at client: " + ((HttpSolrClient) client).getBaseURL());
      assertEquals(version0 + updates.size(), getReplicaValue(client, 0, "_version_"));
    }

    log.info("outOfOrderUpdatesIndividualReplicaTest: This test passed fine...");
  }

  // The following should work: full update to doc 0, in-place update for doc 0, delete doc 0
  private void reorderedDeletesTest() throws Exception {

    clearIndex();
    commit();

    buildRandomIndex(0);

    float inplace_updatable_float = 1;
    // update doc, set
    index("id", 0, "inplace_updatable_float", map("set", inplace_updatable_float));

    LEADER.commit();
    SolrDocument sdoc = LEADER.getById("0"); // RTG straight from the index
    assertEquals(inplace_updatable_float, sdoc.get("inplace_updatable_float"));
    assertEquals("title0", sdoc.get("title_s"));
    long version0 = (long) sdoc.get("_version_");

    // put replica out of sync
    float newinplace_updatable_float = 100;
    List<UpdateRequest> updates = new ArrayList<>();
    updates.add(
        simulatedUpdateRequest(
            null,
            "id",
            0,
            "title_s",
            "title0_new",
            "inplace_updatable_float",
            newinplace_updatable_float,
            "_version_",
            version0 + 1)); // full update
    updates.add(
        simulatedUpdateRequest(
            version0 + 1,
            "id",
            0,
            "inplace_updatable_float",
            newinplace_updatable_float + 1,
            "_version_",
            version0 + 2)); // inplace_updatable_float=101
    updates.add(simulatedDeleteRequest(0, version0 + 3));

    // order the updates correctly for NONLEADER 1
    for (UpdateRequest update : updates) {
      if (log.isInfoEnabled()) {
        log.info("Issuing well ordered update: {}", update.getDocuments());
      }
      NONLEADERS.get(1).request(update);
    }

    // Reordering needs to happen using parallel threads
    ExecutorService threadpool =
        ExecutorUtil.newMDCAwareFixedThreadPool(
            updates.size() + 1, new SolrNamedThreadFactory(getTestName()));

    // re-order the updates for NONLEADER 0
    List<UpdateRequest> reorderedUpdates = new ArrayList<>(updates);
    Collections.shuffle(reorderedUpdates, r);
    List<Future<UpdateResponse>> updateResponses = new ArrayList<>();
    for (UpdateRequest update : reorderedUpdates) {
      AsyncUpdateWithRandomCommit task =
          new AsyncUpdateWithRandomCommit(update, NONLEADERS.get(0), random().nextLong());
      updateResponses.add(threadpool.submit(task));
      // while we can't guarantee/trust what order the updates are executed in, since multiple
      // threads are involved, but we're trying to bias the thread scheduling to run them in the
      // order submitted
      Thread.sleep(10);
    }

    threadpool.shutdown();
    assertTrue(
        "Thread pool didn't terminate within 15 secs",
        threadpool.awaitTermination(15, TimeUnit.SECONDS));

    // assert all requests were successful
    for (Future<UpdateResponse> resp : updateResponses) {
      assertEquals(0, resp.get().getStatus());
    }

    // assert both replicas have same effect
    for (SolrClient client : NONLEADERS) { // 0th is re-ordered replica, 1st is well-ordered replica
      SolrDocument doc = client.getById(String.valueOf(0), params("distrib", "false"));
      assertNull("This doc was supposed to have been deleted, but was: " + doc, doc);
    }

    log.info("reorderedDeletesTest: This test passed fine...");
  }

  /* Test for a situation when a document requiring in-place update cannot be "resurrected"
  * when the original full indexed document has been deleted by an out of order DBQ.
  * Expected behaviour in this case should be to throw the replica into LIR (since this will
  * be rare). Here's an example of the situation:
       ADD(id=x, val=5, ver=1)
       UPD(id=x, val=10, ver = 2)
       DBQ(q=val:10, v=4)
       DV(id=x, val=5, ver=3)
  */
  private void reorderedDBQsResurrectionTest() throws Exception {
    if (onlyLeaderIndexes) {
      log.info("RTG with DBQs are not working in tlog replicas");
      return;
    }
    clearIndex();
    commit();

    buildRandomIndex(0);

    SolrDocument sdoc = LEADER.getById("0"); // RTG straight from the index
    // assertEquals(value, sdoc.get("inplace_updatable_float"));
    assertEquals("title0", sdoc.get("title_s"));
    long version0 = (long) sdoc.get("_version_");

    String field = "inplace_updatable_int";

    // put replica out of sync
    List<UpdateRequest> updates = new ArrayList<>();
    updates.add(
        simulatedUpdateRequest(
            null,
            "id",
            0,
            "title_s",
            "title0_new",
            field,
            5,
            "_version_",
            version0 + 1)); // full update
    updates.add(
        simulatedUpdateRequest(
            version0 + 1,
            "id",
            0,
            field,
            10,
            "_version_",
            version0 + 2)); // inplace_updatable_float=101
    updates.add(
        simulatedUpdateRequest(
            version0 + 2,
            "id",
            0,
            field,
            5,
            "_version_",
            version0 + 3)); // inplace_updatable_float=101
    updates.add(
        simulatedDeleteRequest(field + ":10", version0 + 4)); // supposed to not delete anything

    // order the updates correctly for NONLEADER 1
    for (UpdateRequest update : updates) {
      if (log.isInfoEnabled()) {
        log.info("Issuing well ordered update: {}", update.getDocuments());
      }
      NONLEADERS.get(1).request(update);
    }

    // Reordering needs to happen using parallel threads
    ExecutorService threadpool =
        ExecutorUtil.newMDCAwareFixedThreadPool(
            updates.size() + 1, new SolrNamedThreadFactory(getTestName()));
    // re-order the last two updates for NONLEADER 0
    List<UpdateRequest> reorderedUpdates = new ArrayList<>(updates);
    Collections.swap(reorderedUpdates, 2, 3);

    List<Future<UpdateResponse>> updateResponses = new ArrayList<>();
    for (UpdateRequest update : reorderedUpdates) {
      // pretend as this update is coming from the other non-leader, so that
      // the resurrection can happen from there (instead of the leader)
      update.setParam(
          DistributedUpdateProcessor.DISTRIB_FROM,
          ((HttpSolrClient) NONLEADERS.get(1)).getBaseURL()
              + "/"
              + NONLEADERS.get(1).getDefaultCollection());
      AsyncUpdateWithRandomCommit task =
          new AsyncUpdateWithRandomCommit(update, NONLEADERS.get(0), random().nextLong());
      updateResponses.add(threadpool.submit(task));
      // while we can't guarantee/trust what order the updates are executed in, since multiple
      // threads are involved, but we're trying to bias the thread scheduling to run them in the
      // order submitted
      Thread.sleep(10);
    }

    threadpool.shutdown();
    assertTrue(
        "Thread pool didn't terminate within 15 secs",
        threadpool.awaitTermination(15, TimeUnit.SECONDS));

    int successful = 0;
    for (Future<UpdateResponse> resp : updateResponses) {
      try {
        UpdateResponse r = resp.get();
        if (r.getStatus() == 0) {
          successful++;
        }
      } catch (Exception ex) {
        // reordered DBQ should trigger an error, thus throwing the replica into LIR.
        // the cause of the error is that the full document was deleted by mistake due to the
        // out of order DBQ, and the in-place update that arrives after the DBQ (but was supposed to
        // arrive before) cannot be applied, since the full document can't now be "resurrected".

        if (!ex.getMessage()
            .contains(
                "Tried to fetch missing update"
                    + " from the leader, but missing wasn't present at leader.")) {
          throw ex;
        }
      }
    }
    // All should succeed, i.e. no LIR
    assertEquals(updateResponses.size(), successful);

    if (log.isInfoEnabled()) {
      log.info("Non leader 0: {}", ((HttpSolrClient) NONLEADERS.get(0)).getBaseURL());
      log.info("Non leader 1: {}", ((HttpSolrClient) NONLEADERS.get(1)).getBaseURL()); // nowarn
    }

    SolrDocument doc0 = NONLEADERS.get(0).getById(String.valueOf(0), params("distrib", "false"));
    SolrDocument doc1 = NONLEADERS.get(1).getById(String.valueOf(0), params("distrib", "false"));

    log.info("Doc in both replica 0: {}", doc0);
    log.info("Doc in both replica 1: {}", doc1);
    // assert both replicas have same effect
    for (SolrClient client : NONLEADERS) { // 0th is re-ordered replica, 1st is well-ordered replica
      SolrDocument doc = client.getById(String.valueOf(0), params("distrib", "false"));
      assertNotNull("Client: " + ((HttpSolrClient) client).getBaseURL(), doc);
      assertEquals(
          "Client: " + ((HttpSolrClient) client).getBaseURL(), 5, doc.getFieldValue(field));
    }

    log.info("reorderedDBQsResurrectionTest: This test passed fine...");
    clearIndex();
    commit();
  }

  private void delayedReorderingFetchesMissingUpdateFromLeaderTest() throws Exception {
    clearIndex();
    commit();

    float inplace_updatable_float = 1F;
    buildRandomIndex(inplace_updatable_float, Collections.singletonList(1));

    float newinplace_updatable_float = 100F;
    List<UpdateRequest> updates = new ArrayList<>();
    updates.add(
        regularUpdateRequest(
            "id",
            1,
            "title_s",
            "title1_new",
            "id_i",
            1,
            "inplace_updatable_float",
            newinplace_updatable_float));
    updates.add(regularUpdateRequest("id", 1, "inplace_updatable_float", map("inc", 1)));
    updates.add(regularUpdateRequest("id", 1, "inplace_updatable_float", map("inc", 1)));

    // The next request to replica2 will be delayed (timeout is 5s)
    shardToJetty
        .get(SHARD1)
        .get(1)
        .jetty
        .getDebugFilter()
        .addDelay("Waiting for dependant update to timeout", 1, 6000);

    ExecutorService threadpool =
        ExecutorUtil.newMDCAwareFixedThreadPool(
            updates.size() + 1, new SolrNamedThreadFactory(getTestName()));
    for (UpdateRequest update : updates) {
      AsyncUpdateWithRandomCommit task =
          new AsyncUpdateWithRandomCommit(update, cloudClient, random().nextLong());
      threadpool.submit(task);

      // while we can't guarantee/trust what order the updates are executed in, since multiple
      // threads are involved, but we're trying to bias the thread scheduling to run them in the
      // order submitted
      Thread.sleep(100);
    }

    threadpool.shutdown();
    assertTrue(
        "Thread pool didn't terminate within 15 secs",
        threadpool.awaitTermination(15, TimeUnit.SECONDS));

    commit();

    // TODO: Could try checking ZK for LIR flags to ensure LIR has not kicked in
    // Check every 10ms, 100 times, for a replica to go down (& assert that it doesn't)
    for (int i = 0; i < 100; i++) {
      Thread.sleep(10);
      ZkStateReader.from(cloudClient).forceUpdateCollection(DEFAULT_COLLECTION);
      ClusterState state = cloudClient.getClusterState();

      int numActiveReplicas = 0;
      for (Replica rep : state.getCollection(DEFAULT_COLLECTION).getSlice(SHARD1).getReplicas())
        if (rep.getState().equals(Replica.State.ACTIVE)) numActiveReplicas++;

      assertEquals(
          "The replica receiving reordered updates must not have gone down", 3, numActiveReplicas);
    }

    for (SolrClient client : clients) {
      TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME);
      try {
        timeout.waitFor(
            "Timeout",
            () -> {
              try {
                return (float) getReplicaValue(client, 1, "inplace_updatable_float")
                    == newinplace_updatable_float + 2.0f;
              } catch (SolrServerException e) {
                throw new RuntimeException(e);
              } catch (IOException e) {
                throw new RuntimeException(e);
              }
            });
      } catch (TimeoutException e) {

      }
    }

    for (SolrClient client : clients) {
      if (log.isInfoEnabled()) {
        log.info("Testing client (Fetch missing test): {}", ((HttpSolrClient) client).getBaseURL());
        log.info(
            "Version at {} is: {}",
            ((HttpSolrClient) client).getBaseURL(),
            getReplicaValue(client, 1, "_version_")); // nowarn
      }
      assertReplicaValue(
          client,
          1,
          "inplace_updatable_float",
          (newinplace_updatable_float + 2.0f),
          "inplace_updatable_float didn't match for replica at client: "
              + ((HttpSolrClient) client).getBaseURL());
      assertReplicaValue(
          client,
          1,
          "title_s",
          "title1_new",
          "Title didn't match for replica at client: " + ((HttpSolrClient) client).getBaseURL());
    }

    // Try another round of these updates, this time with a delete request at the end.
    // This is to ensure that the fetch missing update from leader doesn't bomb out if the
    // document has been deleted on the leader later on
    {
      clearIndex();
      commit();
      shardToJetty.get(SHARD1).get(1).jetty.getDebugFilter().unsetDelay();

      updates.add(regularDeleteRequest(1));

      shardToJetty
          .get(SHARD1)
          .get(1)
          .jetty
          .getDebugFilter()
          .addDelay("Waiting for dependant update to timeout", 1, 5999); // the first update
      shardToJetty
          .get(SHARD1)
          .get(1)
          .jetty
          .getDebugFilter()
          .addDelay("Waiting for dependant update to timeout", 4, 5998); // the delete update

      threadpool =
          ExecutorUtil.newMDCAwareFixedThreadPool(
              updates.size() + 1, new SolrNamedThreadFactory(getTestName()));
      for (UpdateRequest update : updates) {
        AsyncUpdateWithRandomCommit task =
            new AsyncUpdateWithRandomCommit(update, cloudClient, random().nextLong());
        threadpool.submit(task);

        // while we can't guarantee/trust what order the updates are executed in, since multiple
        // threads are involved, but we're trying to bias the thread scheduling to run them in the
        // order submitted
        Thread.sleep(100);
      }

      threadpool.shutdown();
      assertTrue(
          "Thread pool didn't terminate within 15 secs",
          threadpool.awaitTermination(15, TimeUnit.SECONDS));

      commit();

      try (ZkShardTerms zkShardTerms =
          new ZkShardTerms(
              DEFAULT_COLLECTION, SHARD1, ZkStateReader.from(cloudClient).getZkClient())) {
        for (int i = 0; i < 100; i++) {
          Thread.sleep(10);
          ZkStateReader.from(cloudClient).forceUpdateCollection(DEFAULT_COLLECTION);
          ClusterState state = cloudClient.getClusterState();

          int numActiveReplicas = 0;
          for (Replica rep :
              state.getCollection(DEFAULT_COLLECTION).getSlice(SHARD1).getReplicas()) {
            assertTrue(zkShardTerms.canBecomeLeader(rep.getName()));
            if (rep.getState().equals(Replica.State.ACTIVE)) numActiveReplicas++;
          }
          assertEquals(
              "The replica receiving reordered updates must not have gone down",
              3,
              numActiveReplicas);
        }
      }

      for (SolrClient client :
          new SolrClient[] {
            LEADER, NONLEADERS.get(0), NONLEADERS.get(1)
          }) { // nonleader 0 re-ordered replica, nonleader 1 well-ordered replica
        SolrDocument doc = client.getById(String.valueOf(1), params("distrib", "false"));
        assertNull("This doc was supposed to have been deleted, but was: " + doc, doc);
      }
    }
    log.info("delayedReorderingFetchesMissingUpdateFromLeaderTest: This test passed fine...");
  }

  /**
   * Use the schema API to verify that the specified expected Field exists with those exact
   * attributes.
   */
  public void checkExpectedSchemaField(Map<String, Object> expected) throws Exception {
    String fieldName = (String) expected.get("name");
    assertNotNull("expected contains no name: " + expected, fieldName);
    FieldResponse rsp = new Field(fieldName).process(this.cloudClient);
    assertNotNull("Field Null Response: " + fieldName, rsp);
    assertEquals("Field Status: " + fieldName + " => " + rsp, 0, rsp.getStatus());
    assertEquals("Field: " + fieldName, expected, rsp.getField());
  }

  private class AsyncUpdateWithRandomCommit implements Callable<UpdateResponse> {
    UpdateRequest update;
    SolrClient solrClient;
    final Random rnd;
    int commitBound = onlyLeaderIndexes ? 50 : 3;

    public AsyncUpdateWithRandomCommit(UpdateRequest update, SolrClient solrClient, long seed) {
      this.update = update;
      this.solrClient = solrClient;
      this.rnd = new Random(seed);
    }

    @Override
    public UpdateResponse call() throws Exception {
      UpdateResponse resp = update.process(solrClient); // solrClient.request(update);
      if (rnd.nextInt(commitBound) == 0) solrClient.commit();
      return resp;
    }
  }

  Object getReplicaValue(SolrClient client, int doc, String field)
      throws SolrServerException, IOException {
    SolrDocument sdoc = client.getById(String.valueOf(doc), params("distrib", "false"));
    return sdoc == null ? null : sdoc.get(field);
  }

  void assertReplicaValue(SolrClient client, int doc, String field, Object expected, String message)
      throws SolrServerException, IOException {
    assertEquals(message, expected, getReplicaValue(client, doc, field));
  }

  // This returns an UpdateRequest with the given fields that represent a document.
  // This request is constructed such that it is a simulation of a request coming from
  // a leader to a replica.
  UpdateRequest simulatedUpdateRequest(Long prevVersion, Object... fields) throws IOException {
    SolrInputDocument doc = sdoc(fields);

    // get baseUrl of the leader
    String baseUrl = getBaseUrl(doc.get("id").toString());

    UpdateRequest ur = new UpdateRequest();
    ur.add(doc);
    ur.setParam("update.distrib", "FROMLEADER");
    if (prevVersion != null) {
      ur.setParam("distrib.inplace.prevversion", String.valueOf(prevVersion));
      ur.setParam("distrib.inplace.update", "true");
    }
    ur.setParam("distrib.from", baseUrl);
    return ur;
  }

  UpdateRequest simulatedDeleteRequest(int id, long version) throws IOException {
    String baseUrl = getBaseUrl("" + id);

    UpdateRequest ur = new UpdateRequest();
    if (random().nextBoolean() || onlyLeaderIndexes) {
      ur.deleteById("" + id);
    } else {
      ur.deleteByQuery("id:" + id);
    }
    ur.setParam("_version_", "" + version);
    ur.setParam("update.distrib", "FROMLEADER");
    ur.setParam("distrib.from", baseUrl);
    return ur;
  }

  UpdateRequest simulatedDeleteRequest(String query, long version) {
    String baseUrl = getBaseUrl((HttpSolrClient) LEADER);

    UpdateRequest ur = new UpdateRequest();
    ur.deleteByQuery(query);
    ur.setParam("_version_", "" + version);
    ur.setParam("update.distrib", "FROMLEADER");
    ur.setParam("distrib.from", baseUrl + DEFAULT_COLLECTION + "/");
    return ur;
  }

  private String getBaseUrl(String id) {
    DocCollection collection = cloudClient.getClusterState().getCollection(DEFAULT_COLLECTION);
    Slice slice = collection.getRouter().getTargetSlice(id, null, null, null, collection);
    return slice.getLeader().getCoreUrl();
  }

  protected String getBaseUrl(HttpSolrClient client) {
    // take a complete Solr url that ends with /collection1 and truncates it to the root url
    // that is used for admin api calls.
    return client
        .getBaseURL()
        .substring(0, client.getBaseURL().length() - DEFAULT_COLLECTION.length() - 1);
  }

  UpdateRequest regularUpdateRequest(Object... fields) {
    UpdateRequest ur = new UpdateRequest();
    SolrInputDocument doc = sdoc(fields);
    ur.add(doc);
    return ur;
  }

  UpdateRequest regularDeleteRequest(int id) {
    UpdateRequest ur = new UpdateRequest();
    ur.deleteById("" + id);
    return ur;
  }

  UpdateRequest regularDeleteByQueryRequest(String q) {
    UpdateRequest ur = new UpdateRequest();
    ur.deleteByQuery(q);
    return ur;
  }

  protected long addDocAndGetVersion(Object... fields) throws Exception {
    SolrInputDocument doc = new SolrInputDocument();
    addFields(doc, fields);

    UpdateRequest ureq = new UpdateRequest();
    ureq.setParam("versions", "true");
    ureq.add(doc);
    UpdateResponse resp;

    // send updates to leader, to avoid SOLR-8733
    resp = ureq.process(LEADER);

    long returnedVersion =
        Long.parseLong(((NamedList<?>) resp.getResponse().get("adds")).getVal(0).toString());
    assertTrue(
        "Due to SOLR-8733, sometimes returned version is 0. Let us assert that we have successfully"
            + " worked around that problem here.",
        returnedVersion > 0);
    return returnedVersion;
  }

  /**
   * Convenience method variant that never uses <code>initFloat</code>
   *
   * @see #buildRandomIndex(Float,List)
   */
  protected List<Long> buildRandomIndex(Integer... specialIds) throws Exception {
    return buildRandomIndex(null, Arrays.asList(specialIds));
  }

  /**
   * Helper method to build a randomized index with the fields needed for all test methods in this
   * class. At a minimum, this index will contain 1 doc per "special" (non-negative) document id.
   * These special documents will be added with the <code>initFloat</code> specified in the
   * "inplace_updatable_float" field.
   *
   * <p>A random number of documents (with negative ids) will be indexed in between each of the
   * "special" documents, as well as before/after the first/last special document.
   *
   * @param initFloat Value to use in the "inplace_updatable_float" for the special documents; will
   *     never be used if null
   * @param specialIds The ids to use for the special documents, all values must be non-negative
   * @return the versions of each of the specials document returned when indexing it
   */
  protected List<Long> buildRandomIndex(Float initFloat, List<Integer> specialIds)
      throws Exception {

    int id = -1; // used for non-special docs
    final int numPreDocs =
        rarely() || onlyLeaderIndexes ? TestUtil.nextInt(random(), 0, 9) : atLeast(10);
    for (int i = 1; i <= numPreDocs; i++) {
      addDocAndGetVersion("id", id, "title_s", "title" + id, "id_i", id);
      id--;
    }
    final List<Long> versions = new ArrayList<>(specialIds.size());
    for (int special : specialIds) {
      if (null == initFloat) {
        versions.add(
            addDocAndGetVersion("id", special, "title_s", "title" + special, "id_i", special));
      } else {
        versions.add(
            addDocAndGetVersion(
                "id",
                special,
                "title_s",
                "title" + special,
                "id_i",
                special,
                "inplace_updatable_float",
                initFloat));
      }
      final int numPostDocs =
          rarely() || onlyLeaderIndexes ? TestUtil.nextInt(random(), 0, 2) : atLeast(10);
      for (int i = 1; i <= numPostDocs; i++) {
        addDocAndGetVersion("id", id, "title_s", "title" + id, "id_i", id);
        id--;
      }
    }
    LEADER.commit();

    assertEquals(specialIds.size(), versions.size());
    return versions;
  }

  /*
   * Situation:
   * add(id=1,inpfield=12,title=mytitle,version=1)
   * inp(id=1,inpfield=13,prevVersion=1,version=2) // timeout indefinitely
   * inp(id=1,inpfield=14,prevVersion=2,version=3) // will wait till timeout, and then fetch a "not found" from leader
   * dbq("inp:14",version=4)
   */
  private void reorderedDBQsUsingUpdatedValueFromADroppedUpdate() throws Exception {
    if (onlyLeaderIndexes) {
      log.info("RTG with DBQs are not working in tlog replicas");
      return;
    }
    clearIndex();
    commit();

    float inplace_updatable_float = 1F;
    buildRandomIndex(inplace_updatable_float, Collections.singletonList(1));

    List<UpdateRequest> updates = new ArrayList<>();
    updates.add(
        regularUpdateRequest(
            "id", 1, "id_i", 1, "inplace_updatable_float", 12, "title_s", "mytitle"));
    updates.add(
        regularUpdateRequest(
            "id", 1, "inplace_updatable_float", map("inc", 1))); // delay indefinitely
    updates.add(regularUpdateRequest("id", 1, "inplace_updatable_float", map("inc", 1)));
    updates.add(regularDeleteByQueryRequest("inplace_updatable_float:14"));

    // The second request will be delayed very, very long, so that the next update actually gives up
    // waiting for this and fetches a full update from the leader.
    shardToJetty
        .get(SHARD1)
        .get(1)
        .jetty
        .getDebugFilter()
        .addDelay("Waiting for dependant update to timeout", 2, 8000);

    ExecutorService threadpool =
        ExecutorUtil.newMDCAwareFixedThreadPool(
            updates.size() + 1, new SolrNamedThreadFactory(getTestName()));
    for (UpdateRequest update : updates) {
      AsyncUpdateWithRandomCommit task =
          new AsyncUpdateWithRandomCommit(update, cloudClient, random().nextLong());
      threadpool.submit(task);

      // while we can't guarantee/trust what order the updates are executed in, since multiple
      // threads are involved, but we're trying to bias the thread scheduling to run them in the
      // order submitted
      Thread.sleep(100);
    }

    threadpool.shutdown();
    assertTrue(
        "Thread pool didn't terminate within 12 secs",
        threadpool.awaitTermination(12, TimeUnit.SECONDS));

    commit();

    // TODO: Could try checking ZK for LIR flags to ensure LIR has not kicked in
    // Check every 10ms, 100 times, for a replica to go down (& assert that it doesn't)
    for (int i = 0; i < 100; i++) {
      Thread.sleep(10);
      ZkStateReader.from(cloudClient).forceUpdateCollection(DEFAULT_COLLECTION);
      ClusterState state = cloudClient.getClusterState();

      int numActiveReplicas = 0;
      for (Replica rep : state.getCollection(DEFAULT_COLLECTION).getSlice(SHARD1).getReplicas())
        if (rep.getState().equals(Replica.State.ACTIVE)) numActiveReplicas++;

      assertEquals(
          "The replica receiving reordered updates must not have gone down", 3, numActiveReplicas);
    }

    for (SolrClient client : clients) {
      if (log.isInfoEnabled()) {
        log.info(
            "Testing client (testDBQUsingUpdatedFieldFromDroppedUpdate): {}",
            ((HttpSolrClient) client).getBaseURL());
        log.info(
            "Version at {} is: {}",
            ((HttpSolrClient) client).getBaseURL(),
            getReplicaValue(client, 1, "_version_")); // nowarn
      }
      assertNull(client.getById("1", params("distrib", "false")));
    }

    log.info("reorderedDBQsUsingUpdatedValueFromADroppedUpdate: This test passed fine...");
  }

  @Override
  public void clearIndex() {
    super.clearIndex();
    try {
      for (SolrClient client : new SolrClient[] {LEADER, NONLEADERS.get(0), NONLEADERS.get(1)}) {
        if (client != null) {
          client.request(simulatedDeleteRequest("*:*", -Long.MAX_VALUE));
          client.commit();
        }
      }
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }
}
